🔨 Adding a job counter to address Semaphore issues#408
🔨 Adding a job counter to address Semaphore issues#408JonasKs merged 2 commits intopython-arq:mainfrom
Conversation
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #408 +/- ##
==========================================
- Coverage 98.66% 98.40% -0.27%
==========================================
Files 11 11
Lines 1052 1063 +11
Branches 199 200 +1
==========================================
+ Hits 1038 1046 +8
- Misses 6 8 +2
- Partials 8 9 +1
Continue to review full report in Codecov by Sentry.
|
|
Thank you! 😊 Would be great if you could add a test that ensure we don't regress in the future. 😊 |
|
Could you suggest a test? I couldn't come up with an appropriate one. |
|
Set max jobs to 1, queue a task and ensure health check is still logged? This should fail without this implementation, right? I'm not at home, but I'm pretty sure there's a health check test already implemented. |
|
I have setup a test that aims to cancel the job when max_jobs queue is full. The test is failing for Python 3.7. The logs show that the job was enqueued and cancelled. There is a stack trace for ConnectionError from Redis in between the 2 log lines. Can you help me debug whenever you can make some time? I do think just re-running the CI job for 3.7 should resolve this. |
JonasKs
left a comment
There was a problem hiding this comment.
Thank you 😊
This looks good to me.
Samuel is busy with v2 of Pydantic, so I wouldn't expect this to be merged until that happens, but you can always use your own fork in the mean time 😊
|
|
||
| if self.job_counter >= self.max_jobs: | ||
| self.sem.release() | ||
| return None |
|
Hey @JonasKs! Any update on when can this be merged? |
|
Sorry this took some time, been a bit busy lately. As for the release, I'm gonna have to put that back into @samuelcolvin's hands - not sure what he has planned. 😊 |
|
|
Issue #405
Having a semaphore in the same event loop, while having the max number of jobs running, blocks heartbeat & cancellation tasks.
Possible solution
max_jobs + 1job_counterthat increments when the semaphore is acquired and decrements when the semaphore is releasedjob_counter == max_jobs, release the semaphore immediately and return from thestart_jobsfunction, basically foregoing starting new jobs.The solution seems to work. The counter is threadsafe because we always increment or decrement it before releasing the semaphore. Quick test app below:
arq_app.pystart_workers.pyarq_app.pyworker.logfor the first heartbeat happeningcancellations.pyworker.logfor the latest heartbeat + cancelleation + new job